package com.xiaofan.apitest.sink

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSinkTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )
    // 定一个一个FlinkJedisConfigBase
    val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("192.168.157.11")
      .setPort(6379)
      .build()

    dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper()))

    env.execute("redis sink test")
  }
}

/**
 * 定义一个RedisMapper
 */
class MyRedisMapper extends RedisMapper[SensorReading] {
  // 定义保存数据写入redis的命令，HSET 表名 field value
  override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")

  // 将id指定为key
  override def getKeyFromData(t: SensorReading): String = t.id

  // 将温度值指定为value
  override def getValueFromData(t: SensorReading): String = t.temperature.toString
}
